package com.mystudy.gmall.realtime.app

import java.lang
import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.{JSON, JSONObject}
import com.mystudy.gmall.realtime.bean.DauInfo
import com.mystudy.gmall.realtime.utils.{MyESUtil, MyKafkaUtil, MyRedisUtil, OffsetManagerUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ListBuffer

/**
 * Author: Felix
 * Desc: DailyActiveUser  日活用户统计业务类
 */
object DauApp {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("dau_app").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //===============消费Kafka数据基本实现===================
    val groupId = "gmall_dau_bak"
    val topic = "gmall_start_bak"

    //从Redis中读取Kafka偏移量
    val kafkaOffsetMap: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(topic,groupId)
    var recordDstream: InputDStream[ConsumerRecord[String, String]] = null
    if(kafkaOffsetMap!=null&&kafkaOffsetMap.size>0){
      //Redis中有偏移量  根据Redis中保存的偏移量读取
      recordDstream = MyKafkaUtil.getKafkaStream(topic, ssc,kafkaOffsetMap,groupId)
    }else{
      // Redis中没有保存偏移量  Kafka默认从最新读取
      recordDstream = MyKafkaUtil.getKafkaStream(topic, ssc,groupId)
    }

    //得到本批次中处理数据的分区对应的偏移量起始及结束位置
    // 注意：这里我们从Kafka中读取数据之后，直接就获取了偏移量的位置，因为KafkaRDD可以转换为HasOffsetRanges，会自动记录位置
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val offsetDStream: DStream[ConsumerRecord[String, String]] = recordDstream.transform {
      rdd => {
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        println(offsetRanges(0).untilOffset + "*****")
        rdd
      }
    }

    //测试输出1
    //recordDstream.map(_.value()).print(100)

    val jsonObjDStream: DStream[JSONObject] = offsetDStream.map { record =>
      //获取启动日志
      val jsonStr: String = record.value()
      //将启动日志转换为json对象
      val jsonObj: JSONObject = JSON.parseObject(jsonStr)
      //获取时间戳 毫秒数
      val ts: lang.Long = jsonObj.getLong("ts")
      //获取字符串   日期 小时
      val dateHourString: String = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts))
      //对字符串日期和小时进行分割，分割后放到json对象中，方便后续处理
      val dateHour: Array[String] = dateHourString.split(" ")
      jsonObj.put("dt",dateHour(0))
      jsonObj.put("hr",dateHour(1))
      jsonObj
    }
    //测试输出2
    //jsonObjDStream.print()

    //===============使用Redis进行去重===================
    /*
    //方案1   缺点：虽然我们从池中获取Reids，但是每次从流中取数据都进行过滤，连接还是过于频繁
    val filteredDStream: DStream[JSONObject] = jsonObjDStream.filter {
      jsonObj => {
        //获取当前日期
        val dt: String = jsonObj.getString("dt")
        //获取设备mid
        val mid: String = jsonObj.getJSONObject("common").getString("mid")
        //获取Redis客户端
        val jedisClient: Jedis = MyRedisUtil.getJedisClient
        //拼接向Redis放的数据的key
        val dauKey: String = "dau:" + dt
        //判断Redis中是否存在该数据
        val isNew: lang.Long = jedisClient.sadd(dauKey,mid)
        //设置当天的key数据失效时间为24小时
        jedisClient.expire(dauKey,3600*24)
        jedisClient.close()
        if (isNew == 1L) {
          //Reids不存在，我们需要从DS流中将数据过滤出来，同时数据会保存到Redis中
          true
        } else {
          //Reids中已经存在该，我们需要把该数据从DS流中过滤掉
          false
        }
      }
    }
    //输出测试    数量会越来越少，最后变为0   因为我们mid只是模拟了50个
    filteredDStream.count().print()*/

    //方案2   以分区为单位进行过滤，可以减少和连接池交互的次数
    val filteredDStream: DStream[JSONObject] = jsonObjDStream.mapPartitions {
      jsonObjItr => {
        //获取Redis客户端
        val jedisClient: Jedis = MyRedisUtil.getJedisClient
        //定义当前分区过滤后的数据
        val filteredList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]
        for (jsonObj <- jsonObjItr) {
          //获取当前日期
          val dt: String = jsonObj.getString("dt")
          //获取设备mid
          val mid: String = jsonObj.getJSONObject("common").getString("mid")
          //拼接向Redis放的数据的key
          val dauKey: String = "dau:" + dt
          //判断Redis中是否存在该数据
          val isNew: lang.Long = jedisClient.sadd(dauKey,mid)
          //设置当天的key数据失效时间为24小时
          jedisClient.expire(dauKey,3600*24)
          if (isNew == 1L) {
            //如果Redis中不存在，那么将数据添加到新建的ListBuffer集合中，实现过滤的效果
            filteredList.append(jsonObj)
          }
        }
        jedisClient.close()
        filteredList.toIterator
      }
    }
    //输出测试    数量会越来越少，最后变为0   因为我们mid只是模拟了50个
    //filteredDStream.count().print()


    //===============向ES中保存数据===================
    filteredDStream.foreachRDD{
      rdd=>{//获取DS中的RDD
        rdd.foreachPartition{//以分区为单位对RDD中的数据进行处理
          jsonItr=>{
            //将每个分区的数据转换为List方便进行处理
            val dauList: List[(String,DauInfo)] = jsonItr.map {
              jsonObj => {
                //每次处理的是一个json对象   将json对象封装为样例类
                val commonJsonObj: JSONObject = jsonObj.getJSONObject("common")
                val dauInfo=DauInfo(
                  commonJsonObj.getString("mid"),
                  commonJsonObj.getString("uid"),
                  commonJsonObj.getString("ar"),
                  commonJsonObj.getString("ch"),
                  commonJsonObj.getString("vc"),
                  jsonObj.getString("dt"),
                  jsonObj.getString("hr"),
                  "00", //分钟我们前面没有转换，默认00
                  jsonObj.getLong("ts")
                )
                (dauInfo.mid,dauInfo)
              }
            }.toList
            //对分区的数据进行批量处理
            //获取当前日志字符串
            val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
            MyESUtil.bulkInsert(dauList,"gmall2020_dau_info_" + dt)
          }
        }
        //在保存最后提交偏移量
        OffsetManagerUtil.saveOffset(topic,groupId,offsetRanges)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}